package com.baomidou.springboot.spark;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.springboot.constant.Constants;
import com.baomidou.springboot.entity.SessionAggrStat;
import com.baomidou.springboot.utils.*;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;

import java.util.*;

/**
 * Created by pcmmm on 17/6/9.
 */
public class SessionAnalysis {
    /**
     * 获取sessionid到访问行为数据的映射的RDD
     * @param actionRDD
     * @return
     */
    public static JavaPairRDD<String, Row> getSessionid2ActionRDD(JavaRDD<Row> actionRDD) {
        return actionRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<Tuple2<String, Row>> call(Iterator<Row> iterator)
                    throws Exception {
                List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();

                while(iterator.hasNext()) {
                    Row row = iterator.next();
                    //sessionid作为key
                    list.add(new Tuple2<String, Row>(row.getString(0), row));
                }
                return list;
            }

        });
    }

    /**
     * 对行为数据按session粒度与user信息进行聚合
     * @param sessinoid2actionRDD 行为数据RDD
     * @return session粒度聚合后的数据
     */
    public static JavaPairRDD<String, String> aggregateBySession(
            JavaSparkContext sc,
            SQLContext sqlContext,
            JavaPairRDD<String, Row> sessinoid2actionRDD) {
        // 对行为数据按session粒度进行分组
        JavaPairRDD<String, Iterable<Row>> sessionid2ActionsRDD =
                sessinoid2actionRDD.groupByKey();

        // 对每一个session分组进行聚合，将session中所有的搜索词和点击品类都聚合起来
        // 数据格式如下：<userid,partAggrInfo(sessionid,searchKeywords,clickCategoryIds)>
        JavaPairRDD<Long, String> userid2PartAggrInfoRDD = sessionid2ActionsRDD.mapToPair(

                new PairFunction<Tuple2<String,Iterable<Row>>, Long, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, String> call(Tuple2<String, Iterable<Row>> tuple)
                            throws Exception {
                        String sessionid = tuple._1;
                        Iterator<Row> iterator = tuple._2.iterator();

                        StringBuffer searchKeywordsBuffer = new StringBuffer("");
                        StringBuffer clickCategoryIdsBuffer = new StringBuffer("");

                        Long userid = null;

                        // session的起始和结束时间
                        Date startTime = null;
                        Date endTime = null;
                        // session的访问步长
                        int stepLength = 0;

                        // 遍历session所有的访问行为
                        while(iterator.hasNext()) {
                            // 提取每个访问行为的搜索词字段和点击品类字段
                            Row row = iterator.next();
                            if(userid == null) {
                                userid = row.getLong(1);
                            }
                            String searchKeyword = row.getString(5);
                            Long clickCategoryId = null ;
                            try{
                                clickCategoryId = row.getLong(6);
                            }catch (NullPointerException e){
                                e.printStackTrace();
                            }

                            // 实际上这里要对数据说明一下
                            // 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的
                            // 其实，只有搜索行为，是有searchKeyword字段的
                            // 只有点击品类的行为，是有clickCategoryId字段的
                            // 所以，任何一行行为数据，都不可能两个字段都有，所以数据是可能出现null值的

                            // 我们决定是否将搜索词或点击品类id拼接到字符串中去
                            // 首先要满足：不能是null值
                            // 其次，之前的字符串中还没有搜索词或者点击品类id

                            if(StringUtils.isNotEmpty(searchKeyword)) {
                                if(!searchKeywordsBuffer.toString().contains(searchKeyword)) {
                                    searchKeywordsBuffer.append(searchKeyword + ",");
                                }
                            }
                            if(clickCategoryId != null) {
                                if(!clickCategoryIdsBuffer.toString().contains(
                                        String.valueOf(clickCategoryId))) {
                                    clickCategoryIdsBuffer.append(clickCategoryId + ",");
                                }
                            }

                            // 计算session开始和结束时间
                            Date actionTime = DateUtils.parseTime(row.getString(4));

                            if(startTime == null) {
                                startTime = actionTime;
                            }
                            if(endTime == null) {
                                endTime = actionTime;
                            }

                            if(actionTime.before(startTime)) {
                                startTime = actionTime;
                            }
                            if(actionTime.after(endTime)) {
                                endTime = actionTime;
                            }

                            // 计算session访问步长
                            stepLength++;
                        }

                        String searchKeywords = StringUtils.trimComma(searchKeywordsBuffer.toString());
                        String clickCategoryIds = StringUtils.trimComma(clickCategoryIdsBuffer.toString());

                        // 计算session访问时长（秒）
                        long visitLength = (endTime.getTime() - startTime.getTime()) / 1000;

                        // 大家思考一下
                        // 我们返回的数据格式，即使<sessionid,partAggrInfo>
                        // 但是，这一步聚合完了以后，其实，我们是还需要将每一行数据，跟对应的用户信息进行聚合
                        // 问题就来了，如果是跟用户信息进行聚合的话，那么key，就不应该是sessionid
                        // 就应该是userid，才能够跟<userid,Row>格式的用户信息进行聚合
                        // 如果我们这里直接返回<sessionid,partAggrInfo>，还得再做一次mapToPair算子
                        // 将RDD映射成<userid,partAggrInfo>的格式，那么就多此一举

                        // 所以，我们这里其实可以直接，返回的数据格式，就是<userid,partAggrInfo>
                        // 然后跟用户信息join的时候，将partAggrInfo关联上userInfo
                        // 然后再直接将返回的Tuple的key设置成sessionid
                        // 最后的数据格式，还是<sessionid,fullAggrInfo>

                        // 聚合数据，用什么样的格式进行拼接？
                        // 我们这里统一定义，使用key=value|key=value
                        String partAggrInfo = Constants.FIELD_SESSION_ID + "=" + sessionid + "|"
                                + Constants.FIELD_SEARCH_KEYWORDS + "=" + searchKeywords + "|"
                                + Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategoryIds + "|"
                                + Constants.FIELD_VISIT_LENGTH + "=" + visitLength + "|"
                                + Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|"
                                + Constants.FIELD_START_TIME + "=" + DateUtils.formatTime(startTime);

                        return new Tuple2<Long, String>(userid, partAggrInfo);
                    }

                });

        // 查询所有用户数据，并映射成<userid,Row>的格式
        String sql = "select * from user_info";
        JavaRDD<Row> userInfoRDD = sqlContext.sql(sql).javaRDD();

        JavaPairRDD<Long, Row> userid2InfoRDD = userInfoRDD.mapToPair(

                new PairFunction<Row, Long, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<Long, Row> call(Row row) throws Exception {
                        return new Tuple2<Long, Row>(row.getLong(0), row);
                    }

                });

        /**
         * 这里就可以说一下，比较适合采用reduce join转换为map join的方式
         *
         * userid2PartAggrInfoRDD，可能数据量还是比较大，比如，可能在1千万数据
         * userid2InfoRDD，可能数据量还是比较小的，你的用户数量才10万用户
         *
         */

        // 将session粒度聚合数据，与用户信息进行join
        JavaPairRDD<Long, Tuple2<String, Row>> userid2FullInfoRDD =
                userid2PartAggrInfoRDD.join(userid2InfoRDD);

        // 对join起来的数据进行拼接，并且返回<sessionid,fullAggrInfo>格式的数据
        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2FullInfoRDD.mapToPair(

                new PairFunction<Tuple2<Long,Tuple2<String,Row>>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(
                            Tuple2<Long, Tuple2<String, Row>> tuple)
                            throws Exception {
                        String partAggrInfo = tuple._2._1;
                        Row userInfoRow = tuple._2._2;

                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);

                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);

                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;

                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }

                });
        return sessionid2FullAggrInfoRDD;
    }

    public static JavaPairRDD<String, String> filterSessionAndAggrStat(
            JavaPairRDD<String, String> sessionid2AggrInfoRDD,
            final JSONObject taskParam,
            final Accumulator<String> sessionAggrStatAccumulator){
        //将所有的筛选参数拼接成一个连接串
        String startAge = ParamUtils.getParamWithSplit(taskParam, Constants.PARAM_START_AGE);
        String endAge = ParamUtils.getParamWithSplit(taskParam, Constants.PARAM_END_AGE);
        String sex = ParamUtils.getParamWithSplit(taskParam, Constants.PARAM_SEX);
        String cities = ParamUtils.getParamWithSplit(taskParam, Constants.PARAM_CITIES);
        String professionals= ParamUtils.getParamWithSplit(taskParam, Constants.PARAM_PROFESSIONALS);
        String keywords = ParamUtils.getParamWithSplit(taskParam, Constants.PARAM_KEYWORDS);
        String categoryIds = ParamUtils.getParamWithSplit(taskParam, Constants.PARAM_CATEGORY_IDS);

        String _parameter = (startAge != null ? Constants.PARAM_START_AGE + "=" + startAge + "|" : "")
                + (endAge != null ? Constants.PARAM_END_AGE + "=" + endAge + "|" : "")
                + (professionals != null ? Constants.PARAM_PROFESSIONALS + "=" + professionals + "|" : "")
                + (cities != null ? Constants.PARAM_CITIES + "=" + cities + "|" : "")
                + (sex != null ? Constants.PARAM_SEX + "=" + sex + "|" : "")
                + (keywords != null ? Constants.PARAM_KEYWORDS + "=" + keywords + "|" : "")
                + (categoryIds != null ? Constants.PARAM_CATEGORY_IDS + "=" + categoryIds: "");

        if(_parameter.endsWith("\\|")) {
            _parameter = _parameter.substring(0, _parameter.length() - 1);
        }

        final String parameter = _parameter;

        JavaPairRDD<String,String> filteredSessionid2AggrInfoRDD = sessionid2AggrInfoRDD.filter(
                new Function<Tuple2<String, String>, Boolean>() {
                    @Override
                    public Boolean call(Tuple2<String, String> tuple) throws Exception {
                        // 首先，从tuple中，获取聚合数据
                        String aggrInfo = tuple._2;

                        // 按照年龄范围进行过滤（startAge、endAge）
                        if(!ValidUtils.between(aggrInfo, Constants.FIELD_AGE,
                                parameter, Constants.PARAM_START_AGE, Constants.PARAM_END_AGE)) {
                            return false;
                        }

                        //按照city进行过滤
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_CITY,
                                parameter, Constants.PARAM_CITIES)){
                            return false;
                        }

                        //按照sex进行过滤
                        if(!ValidUtils.equal(aggrInfo, Constants.FIELD_SEX,
                                parameter, Constants.PARAM_SEX)){
                            return false;
                        }

                        //按照职业进行过滤
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEX,
                                parameter, Constants.PARAM_SEX)){
                            return false;
                        }

                        //按照关键词进行过滤
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_SEX,
                                parameter, Constants.PARAM_SEX)){
                            return false;
                        }

                        //按照片类id进行过滤
                        if(!ValidUtils.in(aggrInfo, Constants.FIELD_CATEGORY_ID,
                                parameter, Constants.PARAM_CATEGORY_IDS)){
                            return false;
                        }
                        sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);

                        // 计算出session的访问时长和访问步长的范围，并进行相应的累加
                        long visitLength = Long.valueOf(StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.FIELD_VISIT_LENGTH));
                        long stepLength = Long.valueOf(StringUtils.getFieldFromConcatString(
                                aggrInfo, "\\|", Constants.FIELD_STEP_LENGTH));
                        String city = StringUtils.getFieldFromConcatString(aggrInfo,"\\|",Constants.FIELD_CITY);
                        String time = StringUtils.getFieldFromConcatString(aggrInfo,"\\|",Constants.FIELD_START_TIME);

                        //System.out.println(_time);
                        //String time = DateUtils.getTimeHours(_time);
                        //System.out.println(time);

                        calculateTime(time);
                        calculateCity(city);
                        calculateVisitLength(visitLength);
                        calculateStepLength(stepLength);
                        return true;
                    }
                    /**
                     * 计算city
                     */
                    private void calculateCity(String city){
                        if (Constants.CITY_BEIJING.equals(city)){
                            sessionAggrStatAccumulator.add(Constants.CITY_BEIJING);
                        }else if (Constants.CITY_GUANGZHOU.equals(city)){
                            sessionAggrStatAccumulator.add(Constants.CITY_GUANGZHOU);
                        }else if (Constants.CITY_SHANGHAI.equals(city)){
                            sessionAggrStatAccumulator.add(Constants.CITY_SHANGHAI);
                        }else if (Constants.CITY_SHENZHEN.equals(city)){
                            sessionAggrStatAccumulator.add(Constants.CITY_SHENZHEN);
                        }else if (Constants.CITY_HANGZHOU.equals(city)){
                            sessionAggrStatAccumulator.add(Constants.CITY_HANGZHOU);
                        }else if (Constants.CITY_NANJING.equals(city)){
                            sessionAggrStatAccumulator.add(Constants.CITY_NANJING);
                        }
                    }
                    /**
                     * 计算time
                     */
                    private void calculateTime(String time){
                        if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"00:00:00","00:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_0_1);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"01:00:00","01:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_1_2);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"02:00:00","02:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_2_3);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"03:00:00","03:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_3_4);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"04:00:00","04:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_4_5);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"05:00:00","05:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_5_6);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"06:00:00","06:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_6_7);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"07:00:00","07:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_7_8);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"08:00:00","08:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_8_9);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"09:00:00","09:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_9_10);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"10:00:00","10:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_10_11);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"11:00:00","11:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_11_12);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"12:00:00","12:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_12_13);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"13:00:00","13:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_13_14);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"14:00:00","14:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_14_15);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"15:00:00","15:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_15_16);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"16:00:00","16:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_16_17);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"17:00:00","17:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_17_18);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"18:00:00","18:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_18_19);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"19:00:00","19:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_19_20);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"20:00:00","20:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_20_21);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"21:00:00","21:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_21_22);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"22:00:00","22:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_22_23);
                        }else if (DateUtils.timeBetween(DateUtils.getTimeHours(time),"23:00:00","23:59:59")){
                            sessionAggrStatAccumulator.add(Constants.TIME_23_0);
                        }


                    }
                    /**
                     * 计算访问时长范围
                     * @param visitLength
                     */
                    private void calculateVisitLength(long visitLength) {
                        if(visitLength >=1 && visitLength <= 3) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
                        } else if(visitLength >=4 && visitLength <= 6) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);
                        } else if(visitLength >=7 && visitLength <= 9) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);
                        } else if(visitLength >=10 && visitLength <= 30) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);
                        } else if(visitLength > 30 && visitLength <= 60) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);
                        } else if(visitLength > 60 && visitLength <= 180) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);
                        } else if(visitLength > 180 && visitLength <= 600) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);
                        } else if(visitLength > 600 && visitLength <= 1800) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);
                        } else if(visitLength > 1800) {
                            sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
                        }
                    }

                    /**
                     * 计算访问步长范围
                     * @param stepLength
                     */
                    private void calculateStepLength(long stepLength) {
                        if(stepLength >= 1 && stepLength <= 3) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);
                        } else if(stepLength >= 4 && stepLength <= 6) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);
                        } else if(stepLength >= 7 && stepLength <= 9) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);
                        } else if(stepLength >= 10 && stepLength <= 30) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);
                        } else if(stepLength > 30 && stepLength <= 60) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
                        } else if(stepLength > 60) {
                            sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
                        }
                    }

                }
        );

        return filteredSessionid2AggrInfoRDD;
    }

    /**
     * 获取通过筛选条件的session的访问明细数据RDD
     * @param sessionid2aggrInfoRDD
     * @param sessionid2actionRDD
     * @return
     */
    public static JavaPairRDD<String, Row> getSessionid2detailRDD(
            JavaPairRDD<String, String> sessionid2aggrInfoRDD,
            JavaPairRDD<String, Row> sessionid2actionRDD) {
        JavaPairRDD<String, Row> sessionid2detailRDD = sessionid2aggrInfoRDD
                .join(sessionid2actionRDD)
                .mapToPair(new PairFunction<Tuple2<String,Tuple2<String,Row>>, String, Row>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, Row> call(
                            Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
                        return new Tuple2<String, Row>(tuple._1, tuple._2._2);
                    }

                });
        return sessionid2detailRDD;
    }

    /**
     * 计算各session范围占比
     * @param value
     */
    public static SessionAggrStat calculateAggrStat(String value, long taskid) {
        // 从Accumulator统计串中获取值
        long session_count = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.SESSION_COUNT));

        long visit_length_1s_3s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_1s_3s));
        long visit_length_4s_6s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_4s_6s));
        long visit_length_7s_9s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_7s_9s));
        long visit_length_10s_30s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_10s_30s));
        long visit_length_30s_60s = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_30s_60s));
        long visit_length_1m_3m = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_1m_3m));
        long visit_length_3m_10m = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_3m_10m));
        long visit_length_10m_30m = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_10m_30m));
        long visit_length_30m = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_PERIOD_30m));

        long step_length_1_3 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_1_3));
        long step_length_4_6 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_4_6));
        long step_length_7_9 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_7_9));
        long step_length_10_30 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_10_30));
        long step_length_30_60 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_30_60));
        long step_length_60 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.STEP_PERIOD_60));

        // 计算各个访问时长和访问步长的范围
        double visit_length_1s_3s_ratio = NumberUtils.formatDouble(
                (double)visit_length_1s_3s / (double)session_count, 2);
        double visit_length_4s_6s_ratio = NumberUtils.formatDouble(
                (double)visit_length_4s_6s / (double)session_count, 2);
        double visit_length_7s_9s_ratio = NumberUtils.formatDouble(
                (double)visit_length_7s_9s / (double)session_count, 2);
        double visit_length_10s_30s_ratio = NumberUtils.formatDouble(
                (double)visit_length_10s_30s / (double)session_count, 2);
        double visit_length_30s_60s_ratio = NumberUtils.formatDouble(
                (double)visit_length_30s_60s / (double)session_count, 2);
        double visit_length_1m_3m_ratio = NumberUtils.formatDouble(
                (double)visit_length_1m_3m / (double)session_count, 2);
        double visit_length_3m_10m_ratio = NumberUtils.formatDouble(
                (double)visit_length_3m_10m / (double)session_count, 2);
        double visit_length_10m_30m_ratio = NumberUtils.formatDouble(
                (double)visit_length_10m_30m / (double)session_count, 2);
        double visit_length_30m_ratio = NumberUtils.formatDouble(
                (double)visit_length_30m / (double)session_count, 2);

        double step_length_1_3_ratio = NumberUtils.formatDouble(
                (double)step_length_1_3 / (double)session_count, 2);
        double step_length_4_6_ratio = NumberUtils.formatDouble(
                (double)step_length_4_6 / (double)session_count, 2);
        double step_length_7_9_ratio = NumberUtils.formatDouble(
                (double)step_length_7_9 / (double)session_count, 2);
        double step_length_10_30_ratio = NumberUtils.formatDouble(
                (double)step_length_10_30 / (double)session_count, 2);
        double step_length_30_60_ratio = NumberUtils.formatDouble(
                (double)step_length_30_60 / (double)session_count, 2);
        double step_length_60_ratio = NumberUtils.formatDouble(
                (double)step_length_60 / (double)session_count, 2);

        // 将统计结果封装为Domain对象
        SessionAggrStat sessionAggrStat = new SessionAggrStat();
        sessionAggrStat.setTaskid(taskid);
        sessionAggrStat.setSession_count(session_count);
        sessionAggrStat.setVisit_length_1s_3s_ratio(visit_length_1s_3s_ratio);
        sessionAggrStat.setVisit_length_4s_6s_ratio(visit_length_4s_6s_ratio);
        sessionAggrStat.setVisit_length_7s_9s_ratio(visit_length_7s_9s_ratio);
        sessionAggrStat.setVisit_length_10s_30s_ratio(visit_length_10s_30s_ratio);
        sessionAggrStat.setVisit_length_30s_60s_ratio(visit_length_30s_60s_ratio);
        sessionAggrStat.setVisit_length_1m_3m_ratio(visit_length_1m_3m_ratio);
        sessionAggrStat.setVisit_length_3m_10m_ratio(visit_length_3m_10m_ratio);
        sessionAggrStat.setVisit_length_10m_30m_ratio(visit_length_10m_30m_ratio);
        sessionAggrStat.setVisit_length_30m_ratio(visit_length_30m_ratio);
        sessionAggrStat.setStep_length_1_3_ratio(step_length_1_3_ratio);
        sessionAggrStat.setStep_length_4_6_ratio(step_length_4_6_ratio);
        sessionAggrStat.setStep_length_7_9_ratio(step_length_7_9_ratio);
        sessionAggrStat.setStep_length_10_30_ratio(step_length_10_30_ratio);
        sessionAggrStat.setStep_length_30_60_ratio(step_length_30_60_ratio);
        sessionAggrStat.setStep_length_60_ratio(step_length_60_ratio);

        return sessionAggrStat;
    }

    public static String calulateCityRatio(String value){
        long beijing = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.CITY_BEIJING));
        long shanghai = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.CITY_SHANGHAI));
        long guangzhou = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.CITY_GUANGZHOU));
        long shenzhen = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.CITY_SHENZHEN));
        long nanjing = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.CITY_NANJING));
        long hangzhou = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.CITY_HANGZHOU));
        Map map = new HashMap();
        map.put("beijing", beijing);
        map.put("shanghai", shanghai);
        map.put("guangzhou", guangzhou);
        map.put("shenzhen", shenzhen);
        map.put("nanjing", nanjing);
        map.put("hangzhou", hangzhou);
        String jsonString = JSONObject.toJSONString(map);
        //JSONObject jsonObject = JSONObject.parseObject(jsonString);
        return jsonString;
    }

    public static String calulateTimeRatio(String value){
        long t_0_1 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_0_1));
        long t_1_2 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_1_2));
        long t_2_3 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_2_3));
        long t_3_4 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_3_4));
        long t_4_5 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_4_5));
        long t_5_6 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_5_6));
        long t_6_7 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_6_7));
        long t_7_8 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_7_8));
        long t_8_9 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_8_9));
        long t_9_10 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_9_10));
        long t_10_11 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_10_11));
        long t_11_12 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_11_12));
        long t_12_13 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_12_13));
        long t_13_14 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_13_14));
        long t_14_15 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_14_15));
        long t_15_16 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_15_16));
        long t_16_17 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_16_17));
        long t_17_18 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_17_18));
        long t_18_19 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_18_19));
        long t_19_20 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_19_20));
        long t_20_21 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_20_21));
        long t_21_22 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_21_22));
        long t_22_23 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_22_23));
        long t_23_0 = Long.valueOf(StringUtils.getFieldFromConcatString(
                value, "\\|", Constants.TIME_23_0));
        Map map = new HashMap();
        map.put("t_0_1", t_0_1);
        map.put("t_1_2", t_1_2);
        map.put("t_2_3", t_2_3);
        map.put("t_3_4", t_3_4);
        map.put("t_4_5", t_4_5);
        map.put("t_5_6", t_5_6);
        map.put("t_6_7", t_6_7);
        map.put("t_7_8", t_7_8);
        map.put("t_8_9", t_8_9);
        map.put("t_9_10", t_9_10);
        map.put("t_10_11", t_10_11);
        map.put("t_11_12", t_11_12);
        map.put("t_12_13", t_12_13);
        map.put("t_13_14", t_13_14);
        map.put("t_14_15", t_14_15);
        map.put("t_15_16", t_15_16);
        map.put("t_16_17", t_16_17);
        map.put("t_17_18", t_17_18);
        map.put("t_18_19", t_18_19);
        map.put("t_19_20", t_19_20);
        map.put("t_20_21", t_20_21);
        map.put("t_21_22", t_21_22);
        map.put("t_22_23", t_22_23);
        map.put("t_23_0", t_23_0);
        String jsonString = JSONObject.toJSONString(map);
        //JSONObject jsonObject = JSONObject.parseObject(jsonString);
        return jsonString;
    }

}
